package com.rabbitmq.origin.api;

import com.rabbitmq.client.*;
import com.rabbitmq.common.utils.RabbitMqUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;

import java.io.IOException;

/**
 * @author
 * @Describe 消费topic类型消息。
 * @date
 */
@Slf4j(topic = "RabbitMqConsumerTopic")
public class RabbitMqConsumerTopic {

    /**
     * 消费匹配的路由消息
     *
     * @param exchangeName
     */
    public void consumerTopic(String exchangeName, int flag, String... routingKeys) {
        Connection conn = null;
        Channel channel = null;
        try {
            // 获取连接
            conn = RabbitMqUtil.getConn();
            // 创建通道（通道可用于消息的发送和接收）
            channel = conn.createChannel();

            // 声明主题类型的交换机
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true);

            // 声明临时队列，非持久，独占，自动删除队列(客户端停止，对应的临时队列会自动删除)，每次都是一个新队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列,该模式绑定的路由key支持和生产者交换机发送的路由key进行通配符匹配
            if (ArrayUtils.isEmpty(routingKeys)) {
                // 此处routingkey设置为#百事匹配所有的发送的路由key
                channel.queueBind(queueName, exchangeName, "#");
            } else {
                // 一个队列绑定多个路由key
                for (int i = 0; i < routingKeys.length; i++) {
                    channel.queueBind(queueName, exchangeName, routingKeys[i]);
                }
            }

            // 创建队列消费者
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    if (flag % 3 == 1) {
                        try {
                            Thread.sleep(1000);
                        } catch (Exception e) {
                            log.error("Thread.sleep 异常", e);
                        }
                    }
                    log.info(" consumer-" + flag + " handleDelivery " + envelope.getRoutingKey() + ":'" + message + "'");
                }
            };

            // 消息消费，自动确认
            channel.basicConsume(queueName, true, "topicConsumer", consumer);
        } catch (Exception e) {
            log.error("消费消息异常,", e);
            // 资源关闭
            RabbitMqUtil.close(conn, channel);
        }
    }
}
